gRPC 学习之-高级知识

December 26, 2021

高级知识

之前的文章中记录了对 gRPC 的使用,但是都没有涉及底层的通信基础知识。这篇文章主要介绍 gRPC 的高级知识,涉及到 gRPC 的底层原理、

gRPC 底层原理

gRPC 超越基础知识

这部分主要讲述在构建真正的gRPC应用时,对于需要增强它们的各种能力,比如:拦截gRPC的输入和输出、弹性处理网络延迟、处理错误、在服务和消费者之间共享元数据。

拦截器

gRPC 中,可以拦截 gRPC 的执行.来满足特定的需求,如日志、认证、性能度盐指标等,这会使用一种名为拦截器的扩展机制。它是 gRPC 核心扩展机制之一 ,在一些使用场呆中非常有用,比如日志、身份验证、授权、性能度盘指标、跟踪以及其他一些自定义错求.

根据所拦截的 RPC 调用类型,gRPC 拦截可以分为两类,对于一元RPC,使用一元拦截器,对于流RPC,使用流拦截器;这些拦截器既可以用在服务端也可以用在客户端。

服务端拦截器

当客户端调用 gRPC服务的远程方法肘,通过使用服务器端拦截器,可以在执行远程方法 之前,执行一个通用的逻辑。

grpc-server-intercepter

在服务器销, 一元拦截器拦截一元 RPC,梳拦iliV:ìI售出1拦截流 RPC. 下回来右一下服务器揣 一元拦敲器.

  • 一元拦截

要实现这一点 ,需要先实现 UnaryServerlnterceptor 类型的函数, 并在创 gRPC 服务器端时将函数注册进来。 UnaryServerlnterceptor 是用于服务器端一元拦截的类型。函数定义类型如下。

func(ctx context.Context, req interface{}, info 叫JnaryServerlnfo,
hand1er UnaryHand1er) (re5p interface{}, err error)

例如:进行一元拦截,打印日志。

func orderUnaryServerlnterceptor(ctx context.Context, req interface{},
	info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
	log.Println("======= [Server Interceptor1 ", info.FullMethod)
	m, err := handler(ctx, req)
	log.Printf("post proc mess: %s", m)
    // 可以拦截修改返回内容
    if order, ok := m.(*pb.Order); ok {
		order.Price = 100
		return order, err
	}
	return m, err
}

func main() {
    ....省略
	s := grpc.NewServer(grpc.UnaryInterceptor(orderUnaryServerlnterceptor))
	pb.RegisterOrderManagementServer(s, &server{})
	if err := s.Serve(listen); err != nil {
		panic(err)
	}
}
  • 流拦截

服务器端流拦截器会拦截 gRPC 服务器所处理的所有流RPC。 流拦截器包括前置处理阶段和流操作拦截阶段。

要实现流拦截,需要实现Streal'1Serverlnterceptor 函数:

func (srv interface{}, ss grpc.ServerStream,info *grpc.StreamServerInfo,handler grpc.StreamHandler) error  {

}

grpc.ServerStream 的包装器可以拦截 gRPC 服务发送或接收到的数据,它实现了 SendMsg 函数和 RecvMsg 函数,这两个函数分别会在服务发送和接收 RPC 流消息的时候调用。

在流 RPC 进入服务前,可以通过 grpc.ServeStream 进行流的处理,再通过 grpc.StreamHandler 来进行调用。

例如:在 OrderManager 的流接口中先进行输入流的处理,再调用 RPC.

type wrappedSteam struct {
	grpc.ServerStream
}

var _ grpc.ServerStream = &wrappedSteam{}

func (w *wrappedSteam) RecvMsg(m interface{}) error {
	log.Printf("=====RecvMSg==== %T", m)
	return w.ServerStream.RecvMsg(m)
}

func (w *wrappedSteam) SendMsg(m interface{}) error {
	log.Println("====SendMsg====", m)
	return w.ServerStream.SendMsg(m)
}

func newWrappedStream(s grpc.ServerStream) grpc.ServerStream {
	return &wrappedSteam{s}
}

func orderServerStreamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
	log.Println("===== [Server Steam Interceptor ]", info.FullMethod) 
	err := handler(srv, newWrappedStream(ss))
	if err != nil {
		log.Fatalf("RPC failed with error %v", err)
	}
	return err
}

func main() {
	...
	s := grpc.NewServer(grpc.StreamInterceptor(orderServerStreamInterceptor))
	pb.RegisterOrderManagementServer(s, &server{})
	if err := s.Serve(listen); err != nil {
		panic(err)
	}
}

在流开始时会进入 orderServerStreamInterceptor 方法,流结束后才会推出,在流每次 RecvMsg 时会进入到流拦截的 (w *wrappedSteam) RecvMsg 方法中,SendMsg 同理。

客户端拦截器

grpc-client-interceptor

  • 一元拦截

一元拦截的函数签名:

type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error

与服务端拦截一元拦截器一样,客户端一元拦截器也有不同的阶段。

例子:

func orderClientInterceptor(ctx context.Context, method string, req, apply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
	log.Println("Method :" + method)
	start := time.Now()
	err := invoker(ctx, method, req, apply, cc, opts...)
	log.Printf("cost is [%d]ms",time.Now().Sub(start).Milliseconds())
	return err
}

func main(){
	conn, err := grpc.Dial(address, grpc.WithInsecure(),grpc.WithUnaryInterceptor(orderClientInterceptor))
}
  • 流拦截
type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)

例子:

type wrappedSteam struct {
	grpc.ClientStream
}

func (w *wrappedSteam) RecvMsg(m interface{}) error {
	log.Printf("===== [ Client Stream Interceptor ] Receive a message (Type :%T) at %v", m, time.Now().Format(time.RFC3339))
	return w.ClientStream.RecvMsg(m)
}

func (w *wrappedSteam) SendMsg(m interface{}) error {
	log.Printf("===== [Client Stream Interceptor ] Send a message (Type :%T) at %v",m, time.Now().Format(time.RFC3339))
	return w.ClientStream.SendMsg(m)
}

func newWrappedStream(cs grpc.ClientStream) grpc.ClientStream {
	return &wrappedSteam{cs}
}

func orderClientStreamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
	log.Println("===== [Client Interceptor] ",method)
	s, err := streamer(ctx, desc, cc, method, opts...)
	return newWrappedStream(s), err
}

func main(){
	conn, err := grpc.Dial(address, grpc.WithInsecure(),grpc.WithStreamInterceptor(orderClientStreamInterceptor))
}

截止时间


LRF 记录学习、生活的点滴